package com.xiaofan.wc

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

/*
bin/flink run-application -t yarn-application \
 -c com.xiaofan.wc.StreamWordCount \
 -Denv.hadoop.conf.dir=/home/hadoop/app/hadoop/etc/hadoop \
 -Dyarn.application.queue="flink" \
 -Djobmanager.memory.process.size=1024m \
 -Dtaskmanager.memory.process.size=1024m \
 -Dtaskmanager.numberOfTaskSlots=4 \
 -Dparallelism.default=4 \
 -Dyarn.application-attempts=4 \
 -Dyarn.application.name="test" \
 hdfs://cluster/flink-1.12/jar/FlinkTurorial1.12.2-1.0-SNAPSHOT-jar-with-dependencies.jar --host 192.168.1.27 --port 9999
 */
object StreamWordCount {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val parameterTool: ParameterTool = ParameterTool.fromArgs(args)
    val host: String = parameterTool.get("host")
    val port: Int = parameterTool.getInt("port")

    val data: DataStream[String] = env.socketTextStream(host, port)

    val result: DataStream[(String, Int)] = data
      .flatMap(_.split(" "))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)

    result.print().setParallelism(1)

    env.execute("Stream Word Count 1.12.2")


  }
}
